// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

/// Utility functions for setting up ports and sending data.
///
/// This library contains a number of functions that handle the
/// boiler-plate of setting up a receive port and receiving a
/// single message on the port.
///
/// There are different functions that offer different ways to
/// handle the incoming message.
///
/// The simplest function, [singleCallbackPort], takes a callback
/// and returns a port, and then calls the callback for the first
/// message sent on the port.
///
/// Other functions intercept the returned value and either
/// does something with it, or puts it into a [Future] or [Completer].
library isolate.ports;

import 'dart:async';
import 'dart:isolate';

import 'util.dart';

SendPort singleCallbackPort<P>(void Function(P response) callback,
    {@Deprecated("Use singleCallbackPortWithTimeout instead") Duration? timeout,
    @Deprecated("Use singleCallbackPortWithTimeout instead") P? timeoutValue}) {
  if (timeout == null) {
    return _singleCallbackPort<P>(callback);
  }
  if (timeoutValue is! P) {
    throw ArgumentError.value(
        null, "timeoutValue", "The result type is non-null");
  }
  return singleCallbackPortWithTimeout<P>(callback, timeout, timeoutValue);
}

/// Helper function for [singleCallbackPort].
///
/// Replace [singleCallbackPort] with this
/// when removing the deprecated parameters.
SendPort _singleCallbackPort<P>(void Function(P) callback) {
  var responsePort = RawReceivePort();
  var zone = Zone.current;
  callback = zone.registerUnaryCallback(callback);
  responsePort.handler = (response) {
    responsePort.close();
    zone.runUnary(callback, response as P);
  };
  return responsePort.sendPort;
}

/// Create a [SendPort] that accepts only one message.
SendPort singleCallbackPortWithTimeout<P>(
    void Function(P response) callback, Duration timeout, P timeoutValue) {
  var responsePort = RawReceivePort();
  var zone = Zone.current;
  callback = zone.registerUnaryCallback(callback);
  Timer? timer;
  responsePort.handler = (response) {
    responsePort.close();
    timer?.cancel();
    zone.runUnary(callback, response as P);
  };

  timer = Timer(timeout, () {
    responsePort.close();
    callback(timeoutValue);
  });

  return responsePort.sendPort;
}

/// Create a [SendPort] that accepts only one message.
SendPort singleCompletePort<R, P>(
  Completer<R> completer, {
  FutureOr<R> Function(P message)? callback,
  Duration? timeout,
  FutureOr<R> Function()? onTimeout,
}) {
  if (callback == null && timeout == null) {
    return _singleCallbackPort<Object>((response) {
      _castComplete<R>(completer, response);
    });
  }
  var responsePort = RawReceivePort();
  Timer? timer;
  if (callback == null) {
    responsePort.handler = (response) {
      responsePort.close();
      timer?.cancel();
      _castComplete<R>(completer, response);
    };
  } else {
    var zone = Zone.current;
    var action = zone.registerUnaryCallback((response) {
      try {
        // Also catch it if callback throws.
        completer.complete(callback(response as P));
      } catch (error, stack) {
        completer.completeError(error, stack);
      }
    });
    responsePort.handler = (response) {
      responsePort.close();
      timer?.cancel();
      zone.runUnary(action, response as P);
    };
  }
  if (timeout != null) {
    timer = Timer(timeout, () {
      responsePort.close();
      if (onTimeout != null) {
        /// workaround for incomplete generic parameters promotion.
        /// example is available in 'TimeoutFirst with invalid null' test
        try {
          completer.complete(Future.sync(onTimeout));
        } catch (e, st) {
          completer.completeError(e, st);
        }
      } else {
        completer
            .completeError(TimeoutException('Future not completed', timeout));
      }
    });
  }
  return responsePort.sendPort;
}

/// Creates a [Future], and a [SendPort] that can be used to complete that
Future<R> singleResponseFuture<R>(
  void Function(SendPort responsePort) action, {
  @Deprecated("Use singleResponseFutureWithTimeout instead") Duration? timeout,
  @Deprecated("Use singleResponseFutureWithTimeout instead") R? timeoutValue,
}) {
  if (timeout == null) {
    return _singleResponseFuture<R>(action);
  }
  if (timeoutValue is! R) {
    throw ArgumentError.value(
        null, "timeoutValue", "The result type is non-null");
  }
  return singleResponseFutureWithTimeout(action, timeout, timeoutValue);
}

/// Helper function for [singleResponseFuture].
///
/// Use this as the implementation of [singleResponseFuture]
/// when removing the deprecated parameters.
Future<R> _singleResponseFuture<R>(
    void Function(SendPort responsePort) action) {
  var completer = Completer<R>.sync();
  var responsePort = RawReceivePort();
  var zone = Zone.current;
  responsePort.handler = (response) {
    responsePort.close();
    zone.run(() {
      _castComplete<R>(completer, response);
    });
  };
  try {
    action(responsePort.sendPort);
  } catch (error, stack) {
    responsePort.close();
    // Delay completion because completer is sync.
    scheduleMicrotask(() {
      completer.completeError(error, stack);
    });
  }
  return completer.future;
}

/// Same as [singleResponseFuture], but with required [timeoutValue],
/// this allows us not to require a nullable return value
Future<R> singleResponseFutureWithTimeout<R>(
    void Function(SendPort responsePort) action,
    Duration timeout,
    R timeoutValue) {
  var completer = Completer<R>.sync();
  var responsePort = RawReceivePort();
  var timer = Timer(timeout, () {
    responsePort.close();
    completer.complete(timeoutValue);
  });
  var zone = Zone.current;
  responsePort.handler = (response) {
    responsePort.close();
    timer.cancel();
    zone.run(() {
      _castComplete<R>(completer, response);
    });
  };
  try {
    action(responsePort.sendPort);
  } catch (error, stack) {
    responsePort.close();
    timer.cancel();
    // Delay completion because completer is sync.
    scheduleMicrotask(() {
      completer.completeError(error, stack);
    });
  }
  return completer.future;
}

/// Send the result of a future, either value or error, as a message.
void sendFutureResult(Future<Object?> future, SendPort resultPort) {
  future.then<void>((value) {
    resultPort.send(list1(value));
  }, onError: (error, stack) {
    resultPort.send(list2('$error', '$stack'));
  });
}

/// Creates a [Future], and a [SendPort] that can be used to complete that
Future<R> singleResultFuture<R>(void Function(SendPort responsePort) action,
    {Duration? timeout, FutureOr<R> Function()? onTimeout}) {
  var completer = Completer<R>.sync();
  var port = singleCompletePort<R, List<Object?>>(completer,
      callback: receiveFutureResult, timeout: timeout, onTimeout: onTimeout);
  try {
    action(port);
  } catch (e, s) {
    // This should not happen.
    sendFutureResult(Future.error(e, s), port);
  }
  return completer.future;
}

/// Completes a completer with a message created by [sendFutureResult]
///
/// The [response] must be a message on the format sent by [sendFutureResult].
void completeFutureResult<R>(List<Object?> response, Completer<R> completer) {
  if (response.length == 2) {
    var error = RemoteError(response[0] as String, response[1] as String);
    completer.completeError(error, error.stackTrace);
  } else {
    var result = response[0] as R;
    completer.complete(result);
  }
}

/// Converts a received message created by [sendFutureResult] to a future
Future<R> receiveFutureResult<R>(List<Object?> response) {
  if (response.length == 2) {
    var error = RemoteError(response[0] as String, response[1] as String);
    return Future.error(error, error.stackTrace);
  }
  var result = response[0] as R;
  return Future<R>.value(result);
}

/// A [Future] and a [SendPort] that can be used to complete the future.
class SingleResponseChannel<R> {
  final Zone _zone;
  final RawReceivePort _receivePort;
  final Completer<R> _completer;
  final FutureOr<R> Function(dynamic)? _callback;
  Timer? _timer;

  /// Creates a response channel.
  SingleResponseChannel(
      {FutureOr<R> Function(dynamic value)? callback,
      Duration? timeout,
      bool throwOnTimeout = false,
      FutureOr<R> Function()? onTimeout,
      R? timeoutValue})
      : _receivePort = RawReceivePort(),
        _completer = Completer<R>.sync(),
        _callback = callback,
        _zone = Zone.current {
    _receivePort.handler = _handleResponse;
    if (timeout != null) {
      if (!throwOnTimeout &&
          onTimeout == null &&
          timeoutValue == null &&
          timeoutValue is! R) {
        _receivePort.close();
        throw ArgumentError.value(null, "timeoutValue",
            "The value is needed and the result must not be null");
      }
      _timer = Timer(timeout, () {
        // Executed as a timer event.
        _receivePort.close();
        if (!_completer.isCompleted) {
          if (throwOnTimeout) {
            _completer.completeError(
                TimeoutException('Timeout waiting for response', timeout));
          } else if (onTimeout != null) {
            _completer.complete(Future.sync(onTimeout));
          } else {
            _completer.complete(timeoutValue as R);
          }
        }
      });
    }
  }

  /// The port expecting a value that will complete [result].
  SendPort get port => _receivePort.sendPort;

  /// Future completed by the first value sent to [port].
  Future<R> get result => _completer.future;

  /// If the channel hasn't completed yet, interrupt it and complete the result.
  ///
  /// If the channel hasn't received a value yet, or timed out, it is stopped
  /// (like by a timeout) and the [SingleResponseChannel.result]
  /// is completed with [result].
  /// If the result type is not nullable, the [result] must not be `null`.
  void interrupt([R? result]) {
    if (result is! R) {
      throw ArgumentError.value(null, "result",
          "The value is needed and the result must not be null");
    }
    _receivePort.close();
    _cancelTimer();
    if (!_completer.isCompleted) {
      // Not in event tail position, so complete the sync completer later.
      _completer.complete(Future.microtask(() => result));
    }
  }

  void _cancelTimer() {
    final timer = _timer;
    if (timer != null) {
      timer.cancel();
      _timer = null;
    }
  }

  void _handleResponse(v) {
    // Executed as a port event.
    _receivePort.close();
    _cancelTimer();
    final callback = _callback;
    if (callback == null) {
      try {
        _completer.complete(v as R);
      } catch (e, s) {
        _completer.completeError(e, s);
      }
    } else {
      // The _handleResponse function is the handler of a RawReceivePort.
      _zone.run(() {
        _completer.complete(Future<R>.sync(() => callback(v)));
      });
    }
  }
}

void _castComplete<R>(Completer<R> completer, Object? value) {
  try {
    completer.complete(value as R);
  } catch (error, stack) {
    completer.completeError(error, stack);
  }
}
